package cn.doitedu.ml.bayes

import cn.doitedu.commons.util.SparkUtil
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.linalg
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.Row

import scala.collection.immutable

/**
  * @date: 2020/2/20
  * @site: www.doitedu.cn
  * @author: hunter.d 涛哥
  * @qq: 657270652
  * @description:
  * 对自然语言应用朴素贝叶斯算法来分类
  * 文本向量化代码手写版
  * 自然语言语义分析范畴
  */
object NLP_BayesClassify_HandWrite {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    val spark = SparkUtil.getSparkSession(this.getClass.getSimpleName)
    import spark.implicits._

    // 1.加载原始样本数据集
    val sample = spark.read.option("header", true).csv("userprofile/data/demo/bayes/nlp/sample/sample.txt")
    sample.cache()

    // 2.特征工程
    // 思想，按照样本集中词的(总种类数*10)，作为每一篇文章的向量长度
    // 利用hash映射算法，来将文章的一个词，映射到向量的某个固定位置特征上，特征值采用：这个词在本文章中的词频数

    // 求总词数
    sample.createTempView("sample")
    val cntRow: Array[Row] = spark.sql(
      """
        |select
        |count(distinct word) as cnts
        |from
        |(
        |select
        |explode(split(doc,' ')) as word
        |from sample
        |)
      """.stripMargin)
      .collect()

    val total: Int = cntRow(0).getAs[Long](0).toInt

    println(total)

    // 接下来，将样本集中的每一篇文档，变成一个total*10长度的特征向量
    val features = sample.rdd.map({
      case Row(docid: String, label: String, doc: String)
      => {
        (docid,label.toDouble,doc,doc2vec(total*10,doc))
      }
    }).toDF("docid","label","doc","features")
    features.show(100,false)



    // 调算法
    val naiveBayes = new NaiveBayes()
        .setFeaturesCol("features")
        .setLabelCol("label")
    val model = naiveBayes.fit(features)


    // 用训练好的模型，直接对原样本集进行预测
    val predict = model.transform(features)
    predict.drop("doc","features","rawPrediction").show(100,false)

    // 用模型，对未知数据进行预测
    val test = spark.read.option("header", true).csv("userprofile/data/demo/bayes/nlp/test")
    val testFeatures = test.rdd.map({
      case Row(docid:String,doc:String)
        =>{
        (docid,doc,doc2vec(total*10,doc))
      }
    }).toDF("docid","doc","features")
    val predict2 = model.transform(testFeatures)
    predict2.show(100,false)

    spark.close()
  }


  /**
    * 文档加工成特征向量的方法
    * Hash映射位置+词频特征值
    * @param vecLength
    * @param doc
    * @return
    */
  def doc2vec(vecLength:Int,doc:String): linalg.Vector ={
    //val arr: Array[Int] = (for(i<-0 to 29) yield 0).toArray
    // [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,......]
    val arr: Array[Double] = Vectors.zeros(vecLength).toArray

    // 切词，并统计该篇文档中每一个词的词频
    // Map(官网 -> 1, 苹果 -> 2, 安卓 -> 1, 骁龙 -> 1, 宣布 -> 1)
    val wordcount: Map[String, Int] = doc.split(" ").map((_,1)).groupBy(tp=>tp._1).mapValues(arr=>arr.map(tp=>tp._2).sum)

    // 遍历这个wordcount结果，将每个词的词频映射到特征向量中对应的位置上去
    for((k,v)<-wordcount){
      arr(k.hashCode % vecLength) = v
    }

    Vectors.dense(arr).toSparse
  }

}
